package com.async;

import canal.bean.RowData;
import com.bean.*;
import com.utils.RedisUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import redis.clients.jedis.Jedis;

import java.util.ArrayList;
import java.util.Collection;

/**
 * @Description:
 * @Author: Sky
 * @Times : 2021/8/10 21:32
 * * 异步查询订单明细数据与维度数据进行关联
 * * 根据订单明细的事实表与维度表进行关联，所以需要redis，从而打开关闭数据源，使用RichAsyncFunction
 * * 使用异步IO的目的是为了提高吞吐量
 */
public class AsyncOrderDetailRedisRequest extends RichAsyncFunction<RowData, OrderGoodsWideEntity> {

    private Jedis jedis;

    @Override
    public void open(Configuration parameters) throws Exception {
        jedis = RedisUtil.getJedis().getResource();
        jedis.select(1);
    }

    @Override
    public void asyncInvoke(RowData rowData, ResultFuture<OrderGoodsWideEntity> resultFuture) throws Exception {
        if (!jedis.isConnected()) {
            jedis = RedisUtil.getJedis().getResource();
            jedis.select(1);
        }
        //1：根据商品id获取商品的详细信息（itcast_shop:dim_goods）
        if (rowData.getColumns().get("goodsId")==null){

        }else {
            String goodsJson = jedis.hget("foo_shop:dim_goods", rowData.getColumns().get("goodsId"));
            //将商品的json字符串解析成商品的样例类
            DimGoodsDBEntity dimGoods = DimGoodsDBEntity.getGoodInfo(goodsJson);

            //2：根据商品表的店铺id获取店铺的详细信息(itcast_shop:dim_shops)
            String shopJson = jedis.hget("foo_shop:dim_shops", dimGoods.getShopId().toString());
            System.out.println(shopJson);
            //将店铺的字段串转换成店铺的样例类
            DimShopsDBEntity dimShops = DimShopsDBEntity.getDimShops(shopJson);
            //3：根据商品的id获取商品的分类信息
            //3.1：获取商品的三级分类信息
            String thirdCatJson = jedis.hget("foo_shop:dim_goods_cats", dimGoods.getGoodsCatId().toString());
            DimGoodsCatDBEntity dimThirdCat = DimGoodsCatDBEntity.getDimGoodCat(thirdCatJson);
            //3.2：获取商品的二级分类信息.
            String secondCatJson = jedis.hget("foo_shop:dim_goods_cats", dimThirdCat.getParentId());
            DimGoodsCatDBEntity dimSecondCat = DimGoodsCatDBEntity.getDimGoodCat(secondCatJson);
            //3.3 获取商品的一级分类信息
            String firstCatJson = jedis.hget("foo_shop:dim_goods_cats", dimSecondCat.getParentId());
            DimGoodsCatDBEntity dimFirstCat = DimGoodsCatDBEntity.getDimGoodCat(firstCatJson);
            //4：根据店铺表的区域id找到组织机构数据
            //4.1：根据区域id获取城市数据
            String cityJson = jedis.hget("foo_shop:dim_org", dimShops.getAreaId().toString());
            DimOrgDBEntity dimOrgCity = DimOrgDBEntity.getDimOrg(cityJson);
            //4.2：根据区域的父id获取大区数据
            String regionJson = jedis.hget("foo_shop:dim_org", dimOrgCity.getParentId().toString());
            DimOrgDBEntity dimOrgRegion = DimOrgDBEntity.getDimOrg(regionJson);

            ////构建订单明细宽表数据对象，返回
            OrderGoodsWideEntity orderGoodsWide = new OrderGoodsWideEntity();
            orderGoodsWide.setOrgId(Long.valueOf(rowData.getColumns().get("ogId")));
            orderGoodsWide.setOrderId(Long.valueOf(rowData.getColumns().get("orderId")));
            orderGoodsWide.setGoodsId(Long.valueOf(rowData.getColumns().get("goodsId")));
            orderGoodsWide.setGoodsNum(Long.valueOf(rowData.getColumns().get("goodsNum")));
            orderGoodsWide.setGoodsPrice(Double.parseDouble(rowData.getColumns().get("goodsPrice")));

            orderGoodsWide.setGoodsName(dimGoods.getGoodsName());
            orderGoodsWide.setShopId(Long.valueOf(dimShops.getShopId()));
            orderGoodsWide.setGoodsThirdCatId(Integer.parseInt(dimThirdCat.getCatId()));
            orderGoodsWide.setGoodsThirdCatName(dimThirdCat.getCatName());
            orderGoodsWide.setGoodsName(dimThirdCat.getCatName());
            orderGoodsWide.setGoodsSecondCatId(Integer.parseInt(dimSecondCat.getCatId()));
            orderGoodsWide.setGoodsSecondCatName(dimSecondCat.getCatName());
            orderGoodsWide.setGoodsFirstCatId(Integer.parseInt(dimFirstCat.getCatId()));
            orderGoodsWide.setGoodsFirstCatName(dimFirstCat.getCatName());
            orderGoodsWide.setAreaId(dimShops.getAreaId());
            orderGoodsWide.setShopName(dimShops.getShopName());
            orderGoodsWide.setShopCompany(dimShops.getShopCompany());
            orderGoodsWide.setCityId(dimOrgCity.getOrgId());
            orderGoodsWide.setCityName(dimOrgCity.getOrgName());

            orderGoodsWide.setRegionId(dimOrgRegion.getOrgId());
            orderGoodsWide.setRegionName(dimOrgRegion.getOrgName());
            orderGoodsWide.setOrgName(dimOrgCity.getOrgName());

            Collection<OrderGoodsWideEntity> collections = new ArrayList<>();
            collections.add(orderGoodsWide);
            resultFuture.complete(collections);
        }
    }

    @Override
    public void close() throws Exception {
        if (jedis.isConnected()) {
            jedis.close();
        }
    }

    /*
     * 连接redis超时的操作，默认会抛出异常，一旦重写了该方法，则执行方法的逻辑
     * */
    @Override
    public void timeout(RowData input, ResultFuture<OrderGoodsWideEntity> resultFuture) {
        System.out.println("订单明细实时拉宽操作的时候，与维度数据进行关联操作超时了");
    }


}
